您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10551|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;  }, f6 @+ n$ s( ?$ x8 d7 G
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    7 K) Z5 Q# Z6 B- E5 N3 G6 f, g. v' p
  3. 3 \9 W, K" Q" k  {, `
  4. $worker = new Worker();3 A% j/ Q4 L/ ]7 W1 J
  5. // 4个进程: q% G6 h1 U$ X  w' Y* I8 }
  6. $worker->count = 4;
    . @0 r! a1 I: D( H
  7. // 每个进程启动后在当前进程新增一个Worker监听. c5 V9 `2 q" O
  8. $worker->onWorkerStart = function($worker)
    $ p7 n2 M; f1 a/ z5 p7 T. Z" t6 Z
  9. {+ h$ ?4 U% f6 @6 ^6 x% ?$ c& K: f7 X0 Z
  10.     /**% g" n% ]2 V- ~8 T7 r8 M# T) Z
  11.      * 4个进程启动的时候都创建2016端口的Worker
    8 Q' A3 X8 G5 l/ E* ?* t
  12.      * 当执行到worker->listen()时会报Address already in use错误
    - I6 O# q4 ^# h
  13.      * 如果worker->count=1则不会报错
    ! G: ?- W/ F0 o, p' X7 ^1 n- O
  14.      */+ _/ X5 h! u: k& T# p
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');! H0 C: m# H) M! |, L3 P
  16.     $inner_worker->onMessage = 'on_message';
    7 P$ U# e& U  o/ ]4 F
  17.     // 执行监听。这里会报Address already in use错误
    - A* f- [& [# V: T* i
  18.     $inner_worker->listen();
    & R- X" A. ~, B, `, g( _9 G# Q/ e
  19. };
    $ m! O. n% b* v, c/ Y$ X

  20. ' h1 H: p( O( Y8 H
  21. $worker->onMessage = 'on_message';( {8 R! G3 }* |4 A/ o! m1 x
  22. 6 q; q# m! |3 d( [. q8 d' d
  23. function on_message($connection, $data)
    % M* ]4 s) I7 ~
  24. {" l6 S1 H9 ~# s
  25.     $connection->send("hello\n");% _) F8 F9 \1 ^4 i7 u
  26. }- U  b7 E' h& E1 z4 y4 p3 h

  27. $ e( H) ?$ _2 Q7 ~2 Q$ O
  28. // 运行worker. e3 l  ^% O8 \  V' Z) `/ r4 @: C: h* _
  29. Worker::runAll();- g  G+ d" G" ~( |! A. k! h# A; `
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:: w% k. M6 q2 F' F
  31.   D1 g- y5 ^5 k$ i# Y
  32. use Workerman\Worker;1 B2 U% B5 g. d3 V
  33. require_once './Workerman/Autoloader.php';  r; r% k4 Y1 @) h- t  A3 L

  34. ' u3 i% W2 T# `, Q  Q
  35. $worker = new Worker('text://0.0.0.0:2015');9 u8 e, A5 Z1 t' G& Q
  36. // 4个进程' w% ~) O/ \" z2 a, ]" Q5 L4 K
  37. $worker->count = 4;
    ! V2 F6 B8 y: U( U- b: |! A
  38. // 每个进程启动后在当前进程新增一个Worker监听- D# [5 h$ j, |1 z8 Z
  39. $worker->onWorkerStart = function($worker): h1 W2 y1 W# p& \! m
  40. {& _0 _0 h! _; h- j
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');3 a% I6 X% h2 A$ ^
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    . j4 f" h) ?6 H) D2 G9 P) F" Z7 }8 Y
  43.     $inner_worker->reusePort = true;$ @& _" C) A# V9 a, G/ g
  44.     $inner_worker->onMessage = 'on_message';
    1 T' P9 K. ?) u6 H0 I* S2 j0 W
  45.     // 执行监听。正常监听不会报错( i% x/ s* B- [2 z: [
  46.     $inner_worker->listen();1 s2 p# R* S" |8 s1 U
  47. };$ P$ h# ]7 ~7 M- j
  48. % N9 ~# u/ h; N! V0 x: O, N7 Z" [
  49. $worker->onMessage = 'on_message';
    * V$ j% i. \6 `' C$ k2 h

  50. ' i0 E) Y3 Z8 R" Q4 Q) R
  51. function on_message($connection, $data)
    ; E% g( e. _3 w2 \5 z5 U
  52. {
    ) [5 d" L1 r: I0 G
  53.     $connection->send("hello\n");
    . `% ^6 K1 J7 u' `* s
  54. }6 }) j/ O: A4 v$ D0 _

  55. ( J, O* @, N  _, s! D& h
  56. // 运行worker1 b3 B4 K: M8 C% z. U5 {, b
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php) w( a7 J4 k; w
  2. use Workerman\Worker;
    / k. W/ B( \3 R6 T1 v4 o
  3. require_once './Workerman/Autoloader.php';; ]5 c% M" {5 N" M4 |3 e% s1 x, J
  4. // 初始化一个worker容器,监听1234端口
    + U2 F0 @2 p8 X! I' s. p) \3 ]. T
  5. $worker = new Worker('websocket://0.0.0.0:1234');4 @) H7 v; v. E
  6. 8 r. L1 a+ w& M6 \
  7. /*6 j" g" M+ b; q4 D- ?
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误' r( w4 x) W1 y1 ?+ Z3 f6 `
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)+ }" _$ i1 O2 K: I# G
  10. */) Y. N( d+ n: R6 W2 @
  11. $worker->count = 1;. d9 e: I6 P# U- J: @
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口3 |) h7 x' T7 ]' V
  13. $worker->onWorkerStart = function($worker)( G% Y3 e, E+ R; u3 w
  14. {
    6 }; m' }- T( t* E/ N3 i
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ! d8 A( k  o% b2 m( Z2 A3 `7 _
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    5 _7 m" d; n6 Z7 a
  17.     $inner_text_worker->onMessage = function($connection, $buffer)7 G$ M; o9 n: n5 O
  18.     {
    ( G4 @. n5 s" M0 [% V2 P
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据7 w- M% i( s0 t8 G! L, p& L- n$ T
  20.         $data = json_decode($buffer, true);
    5 S' J' a; I+ Y' f0 @- [5 ?
  21.         $uid = $data['uid'];
    6 V- Z& X8 `/ M- A* ^
  22.         // 通过workerman,向uid的页面推送数据$ E! o- d2 y( P% c  C& C) ?
  23.         $ret = sendMessageByUid($uid, $buffer);; u( R& C0 {7 [; B" x
  24.         // 返回推送结果
    # h# v  V6 _7 w* Y% W
  25.         $connection->send($ret ? 'ok' : 'fail');
    : w2 k  G; `8 Y, z; O- `
  26.     };* ~6 `; h# E7 y- }6 y9 [
  27.     // ## 执行监听 ##
    ' F3 B, t  t- |, ]% v4 K( @/ i2 T/ @% \
  28.     $inner_text_worker->listen();
    / h$ ?. x; w" E4 B
  29. };
    ! r- R; q- a/ f2 N7 Q1 x. V
  30. // 新增加一个属性,用来保存uid到connection的映射& }9 U& B# t) `# a
  31. $worker->uidConnections = array();' a9 A; a8 P- t0 u
  32. // 当有客户端发来消息时执行的回调函数
    , f; C) T6 n" I2 G" ]
  33. $worker->onMessage = function($connection, $data)  i% P9 R# v+ C6 W
  34. {
    2 {4 I6 N: r9 i2 f
  35.     global $worker;8 E+ j6 L3 s$ k3 T
  36.     // 判断当前客户端是否已经验证,既是否设置了uid3 M6 z0 M& D$ F5 @% j6 x
  37.     if(!isset($connection->uid))
    7 K( Y- U  g% R' K+ b
  38.     {
    3 z' {" n. A. t( g. S
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)+ C2 A  b& P6 K9 k& j& L
  40.        $connection->uid = $data;
    ) U, ]+ \1 n. ~3 G8 l
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    : @2 a' v1 F' t' \2 @) a
  42.         * 实现针对特定uid推送数据8 G8 s& T6 L1 d9 z
  43.         */8 r1 g: @5 W/ g- [
  44.        $worker->uidConnections[$connection->uid] = $connection;
    " ~+ v' ~9 P* [! Y* M0 n) L+ h7 r
  45.        return;
    0 k" V& I. f5 Z$ Z+ D7 ^  G
  46.     }
    5 i6 L2 C, j) \7 \8 P
  47. };! }) D: A; J0 N2 Z8 U* b( a# Y
  48. & J0 `5 `& r' S9 ?0 i# {
  49. // 当有客户端连接断开时
    # g' X+ G5 _1 u- h$ e) P' p
  50. $worker->onClose = function($connection)9 I: T/ l$ ^6 d8 ]3 B8 Q
  51. {: ?7 }' G; |4 G$ B: ~* w
  52.     global $worker;7 v3 ?$ b# _3 P) B- ~/ h1 \# K9 X  V
  53.     if(isset($connection->uid)); Z/ V0 H1 n& ]; [" S: ]
  54.     {
    ) D: G0 v# Z3 \+ h* N; X
  55.         // 连接断开时删除映射6 E& }  z! N$ i. h
  56.         unset($worker->uidConnections[$connection->uid]);0 B. \8 D' Q) m. {3 a3 L  u
  57.     }
    ) I. K" a# o: P5 j; s7 O% q
  58. };7 `1 U7 q& ]: [; o" U

  59. % K8 x/ F# b9 o/ {* B
  60. // 向所有验证的用户推送数据
    ' U) ?; j, z+ u$ p
  61. function broadcast($message)
    0 _; a( j/ h8 w/ s' `( m9 V
  62. {
      C8 ]# R4 D4 M
  63.    global $worker;
    2 U$ y% b( t0 a# i
  64.    foreach($worker->uidConnections as $connection)* C. _7 l, n; `' |
  65.    {
    ; l5 w% L4 }5 ~- m8 H
  66.         $connection->send($message);+ W, I, q3 y0 z- D5 u4 V! v. h
  67.    }
    3 m7 ^  X. J+ y- m5 r$ Y
  68. }
    1 u2 X( p' Y/ V0 d/ v; d; d5 Z2 `
  69. " t3 g4 X6 s( [" `' y/ Q' p" ]
  70. // 针对uid推送数据( M) F9 Y8 y' g# d
  71. function sendMessageByUid($uid, $message)# Q1 D4 B: r8 j- k5 x  q) c
  72. {2 v" s0 J9 Z# e# _' P0 L% G
  73.     global $worker;( M0 H# v) f7 j7 S! P3 g8 u
  74.     if(isset($worker->uidConnections[$uid]))5 b* L: S9 R7 \
  75.     {
    2 b' j: q* b( F) X7 [
  76.         $connection = $worker->uidConnections[$uid];1 `5 P8 C6 k& p& t( Z, R  S
  77.         $connection->send($message);9 E* h; h7 x9 Y2 A2 T$ M# l
  78.         return true;) `3 _! N3 k0 X& m/ y: H2 A
  79.     }
    8 ^* }) V8 B! ]% Y" P% Q+ x0 Y
  80.     return false;; z7 I( ^& l* `: h
  81. }
    / `/ N, ?. a. y7 P

  82. ) P. K* d2 q4 X7 [$ r4 M
  83. // 运行所有的worker
    ( L1 L" P# c9 b
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');* ?' N: z9 @# T' w+ T/ a
  2. ws.onopen = function(){
    : q& y+ |6 q9 u+ u9 R1 j
  3.     var uid = 'uid1';& Y  {& [9 t* O/ e
  4.     ws.send(uid);" a% @) j0 n. |+ \+ W+ h
  5. };5 G9 i, q! R) R' Z2 C7 T
  6. ws.onmessage = function(e){+ {' A6 k; `! C
  7.     alert(e.data);& ]6 B; r. s( b$ g8 h6 e  |
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    + Y2 V7 d7 g- Z: g$ m9 A* S
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);3 v; d: o1 w9 @9 ?& x
  3. // 推送的数据,包含uid字段,表示是给这个uid推送( g! |% c: n7 p  Z
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    & a0 J) U( u  c6 q& W. S/ V
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符3 ]* w7 {4 \! m' }
  6. fwrite($client, json_encode($data)."\n");$ ~* }3 x, x* I9 U2 N* Y$ {9 ~6 M
  7. // 读取推送结果
    ( ?, g2 O/ t9 ~& M; J% c
  8. echo fread($client, 8192);
复制代码
  M: |$ y2 h% c) g
6 a1 ^5 Q8 s& O3 e
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-19 15:28 , Processed in 0.130687 second(s), 21 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!